package org.fjsei.yewu.repository.maint;

import graphql.schema.DataFetchingEnvironment;
import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import md.system.User;
import org.fjsei.yewu.filter.UserBase;
import org.fjsei.yewu.filter.Uunode;
import org.fjsei.yewu.jpa.PageOffsetFirst;
import org.fjsei.yewu.jpa.ProjectionRepository;
import org.fjsei.yewu.pojo.SliceSyncRes;
import org.fjsei.yewu.util.Tool;
import org.jgroups.util.Triple;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;

//import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
//import static org.elasticsearch.index.query.QueryBuilders.matchPhraseQuery;

/** 后台作业模式：5分钟内最少更新后台状态一次。否则超时释放作业锁。其它会话可以发起新作业。获取会话ID返回给前端，前端把会话ID++回送回来。
 * autid会话 认证是一致或者5分钟解锁的，启动下一次时间片作业｛offset为起点,最多运行到limit记录完成｝。
 * JobManager 表明不行？
 * 加上indexes={@Index(columnList = "name")索引：避免小强数据库事务争用：没加spans: FULL SCAN  distribution: full，加后spans: [/'ES' - ] distribution: local；
*/

@Slf4j
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(indexes={@Index(columnList = "name"), } )
//默认生成策略表名是=jslicemang;
public class JsliceMang implements Uunode {
    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    @Column(name = "id")
    private UUID id;
    //为了 分片任务允许被别人抢去； CRDB?能用吗？
    //【问题】加version乐观锁实际对CRDB没意义了，CRDB小强数据库的内置事务隔离等级更高。IDEA的数据库工具会无意识添加事务很可能导致后端程序事务超慢或失败，@必须重启动IDEA。
    @Version
    @Column(name = "version")
    private int  version;

    //数据库 直接配置的：
    @Column(name = "name")
    private String name;    //作业标识代码；｛｝
    @Column(name = "last")
    private LocalDateTime last;   //最近运行, 5分钟内最少更新后台状态一次。5分钟之后可以解锁返回uuid
    /**若是ES的search after深度翻页方式运作的：该字段改为当前已经扫描的个数，而不是准备读取的分页偏移量
     * 关系数据库深度翻页都很慢，ES不支持from size深度翻页。
     * */
    @Column(name = "offs")
    private Integer  offs;      //从=0开始的；
    //特别坑！！ limit 名字不能用，无法创建表啊！
    @Column(name = "limt")
    private Integer  limt;    //作业 分片大小。
    @Column(name = "totl")
    private Integer  totl;
    @Column(name = "auid")
    private String  auid;    //会话ID++ uuid对接认证, 客户接管需要id;
    //还是增加 一道密码。不要和UUID碰撞的密码！ 比如14be54ef6ee84755a5b16b28cea881f3
    //保留关键字： "sure" 不能用作密码;
    @Column(name = "pasw")
    private String  pasw;
    /**并发：本通道最小的那个ID*/
    private UUID idFirst;
    /**并发：本通道最大的那ID*/
    private UUID idLast;

    /**深度翻页很慢，需要改成searchAfter模式可提高性能；
    ES翻页的上次复合排序多字段json;
    批作业当前的起始搜索ID ,目标_id > ID。
     * 若想批量处理大的某实体表：用数据库的分页查询很慢的，目前可用:办法1：是导出DB数据再离线大数据批作业形式；办法2：有ES相应实体的走ES搜索方式，ES数据也是批量同步和重建索引得到的。
     * 两个办法都是需要很长时间初始化数据集的。性能负担高！
     * 这里是配合ES的search after而采用的排序_id，用于分片作业的连贯分页查询。
     * 作业首次启动该字段=null；随后每一次也就是每个单通道单个线程的作业程序都是从这个id往ASC升序检索，
     * 多个通道如何：只能分开多个type作业类别了。类比OD.5多通道做法。第二个通道的首次作业启动的本字段就不是null的了，必须设定第一个通道的最大可能id;第三个通道...排序字段确保一致；
     * 可实际作业不一定是ID作为排序定位的，其它多个字段联合排序定位的/Group By?。
     * */
    private String  sortat;

    /** 非空表示有错误。
     * 状态 : [特别形式]=反馈给前端的 操作结果; 无需要保存数据库的运行生成字段。
    * */
    @Transient
    private String desc;

    /**当前任务操作人; 谁发起的任务。
     * 加(fetch= FetchType.LAZY) 会报错
     * */
    @ManyToOne
    private User fireor;

    /** [安全要求] 以UserBase 替换掉 java实体的 User类型;
     * 这样子，graphQL前端可定没法子获取不属于UserBase的其余字段信息。
     * */
    public UserBase fireor() {
        return  fireor;
    }

    //因为特例JsliceMang syncEqpFromLegacy ：允许ID=null;
    @Override
    public  String getId(DataFetchingEnvironment env){
        String typeName= this.getClass().getSimpleName();
        return (null==this.getId()? null : Tool.toGlobalId(typeName, this.getId()) );
    }
//    public String id() {
//        return Tool.toGlobalId(this.getClass().getSimpleName(), String.valueOf(this.id));
//    }
    /**运行一个分片时间的作业：
     * T 是分片操作的实体类。
     * pagingAndSortingRepository 输入是一张表；顺序分片查看整个表。 sliceJob是真正操作的函数。
     * */
    public <T extends SliceSyncRes, RP extends PagingAndSortingRepository<T,?> & CrudRepository<T,?>> boolean  run(RP pagingAndSortingRepository, Function<SliceSyncRes, Boolean> sliceJob){
        Pageable pageable= PageOffsetFirst.of(this.getOffs(),this.getLimt());
        //分片任务，保证可以重复执行，确保findAll读取出来的记录有顺序。
        log.info("run:作业{},Off={}", name, this.getOffs());
        Iterable<T>  pall= pagingAndSortingRepository.findAll(pageable);
        int count=0;
        for (T parent:pall) {
            Boolean execOK=true;
            try {
                //运行了才报错org.fjsei.yewu.entity.fjtj.UntMge cannot be cast to <T> class org.fjsei.yewu.pojo.SliceSyncRes
                execOK= sliceJob.apply(parent);
            } catch (Exception e) {
                e.printStackTrace();
                execOK= false;
            }
            if(!execOK){
                log.error("报错暂停,该后退一limit:{}",this.getOffs()+count);
                this.setDesc("报错暂停,该后退一limit");
                this.setOffs(this.getOffs() + count);
                //jsliceMangRepository.save(jobm);
                return false;
            }
            count++;
            LocalDateTime now= LocalDateTime.now();
            java.time.Duration duration = java.time.Duration.between(this.getLast(), now);
            //怕时间太长，影响任务抢占了。分片运行有效时间太短了反而没机会抢到。
            if(duration.toSeconds()>60)
                break;
        }
        this.setOffs(this.getOffs() + count);
        return true;
    }
    /**批量保存的模式； 外面加了@Transactional事务机制。
     * totl标识分片作业最后一次运行Loop,但实际作业可允许超过total对应的那一条记录。所以这里的作业必须运行可以重复执行==幂等性！
     * 接口幂等性： Insert:没有唯一业务号，用Token或其它来保证幂等  https://blog.csdn.net/BushQiang/article/details/109964043
     * 在订单的支付中，如果没有幂等性，接口的重试可能造成重复支付。
     *这里 T 是pagingAndSortingRepository的实体类型。
     *在boot2.7版本是使用PagingAndSortingRepository类型的，3.0改成RP;
     * */
    public <T, RP extends PagingAndSortingRepository<T,?> & CrudRepository<T,?>> boolean  runBatchSave(RP pagingAndSortingRepository, Function<T, Triple<?,?,?>> sliceJob,
                                     CrudRepository repository1, CrudRepository repository2){
        Pageable pageable= PageOffsetFirst.of(this.getOffs(),this.getLimt());
        //分片任务，保证可以重复执行，确保findAll读取出来的记录有顺序。
        log.info("run:作业{},Off={}", name, this.getOffs());
        //旧的是 Iterable<T>  pall= pagingAndSortingRepository.findAll(pageable);
        Slice<T> pall= (Slice<T>)pagingAndSortingRepository.findAll(pageable);
        int count=0;
        List<Object> rpl1=new ArrayList<>();
        List rpl2=new ArrayList<>();
        for (T parent:pall) {
            Triple<String, Object, Object> triple=null;
            Boolean execOK=true;
            try {
                //返回需要批量存储仓库的实体1，2：
                triple= (Triple<String, Object, Object>) sliceJob.apply(parent);
                Assert.isTrue(null!=triple,"必须triple");
                String result= triple.getVal1();
                //错误提示result，不代表绝对没有生成可用数据，有些仅仅算提示。
                if(StringUtils.hasText(result) )
                    ((SliceSyncRes)parent).setFail(result);
                else
                    ((SliceSyncRes)parent).setFail(null);
                Object enty1= triple.getVal2();
                if(null!=enty1)     rpl1.add(enty1);
                Object enty2= triple.getVal3();
                if(null!=enty2)     rpl2.add(enty2);
            } catch (Exception e) {
                e.printStackTrace();
                execOK= false;
            }
            if(!execOK){
                log.error("报错暂停,该后退一limit:{}",this.getOffs()+count);
                this.setDesc("报错暂停,该后退一limit");   //报错暂停应该后退一个limit
                this.setOffs(this.getOffs() + count);
                //取消批量存储？ 一整批抛弃？原先最多弃单个
                return false;
            }
            count++;
            LocalDateTime now= LocalDateTime.now();
            java.time.Duration duration = java.time.Duration.between(this.getLast(), now);
            //怕时间太长，影响任务抢占了。分片运行有效时间太短了反而没机会抢到。
            if(duration.toSeconds()>60)
                break;
        }
        //最后一批一起保存。应该把ES批量保存和数据库事务拆开，CRDB事务只管数据库的作业，ES保存时间要拖出去。
        if(null!=repository1)   repository1.saveAll(rpl1);   //居然5秒就保存超时失败导致分片任务终止
        if(null!=repository2)   repository2.saveAll(rpl2);
        pagingAndSortingRepository.saveAll(pall);
        this.setOffs(this.getOffs() + count);
        return true;
    }

    public  JsliceMang(String name,Integer offs,Integer limt,LocalDateTime last){
        this.name=name;
        this.offs=offs;
        this.limt=limt;
        this.last= last;
    }

    /**Slice真正起作用： 去除count(*)的无意义消耗。
     * T是母实体类，P是该实体类动态投影；
     * */
    public <T,P extends SliceSyncRes> boolean  runBatchSlice(ProjectionRepository pagingAndSortingRepository, Function<T, Triple<?,?,?>> sliceJob,
                                        CrudRepository repository1, CrudRepository repository2, Class<P> type){
        Pageable pageable= PageOffsetFirst.of(this.getOffs(),this.getLimt());
        //分片任务，保证可以重复执行，确保findAll读取出来的记录有顺序。
        log.info("run:作业{},Off={}", name, this.getOffs());
        //旧的是 Slice<P> pall= (Slice<P>)pagingAndSortingRepository.readAllBy(pageable, type);
        Slice<P> pall= (Slice<P>)pagingAndSortingRepository.findAllByIdBetween(this.getIdFirst(),this.getIdLast(),pageable,type);
        int count=0;
        List rpl1=new ArrayList<>();
        List rpl2=new ArrayList<>();
        for (P parent:pall) {
            if(!"OK".equals(parent.getFail())) {
                Triple<String, Object, Object> triple = null;
                Boolean execOK = true;
                try {
                    //返回需要批量存储仓库的实体1，2：
                    triple = (Triple<String, Object, Object>) sliceJob.apply((T) parent);
                    Assert.isTrue(null != triple, "必须triple");
                    String result = triple.getVal1();
                    //错误提示result，不代表绝对没有生成可用数据，有些仅仅算提示。
                    if (StringUtils.hasText(result))
                        ((SliceSyncRes) parent).setFail(result);
                    else
                        ((SliceSyncRes) parent).setFail("OK");
                    Object enty1 = triple.getVal2();
                    if (null != enty1) rpl1.add(enty1);
                    Object enty2 = triple.getVal3();
                    if (null != enty2) rpl2.add(enty2);
                } catch (Exception e) {
                    e.printStackTrace();
                    execOK = false;
                }
                if (!execOK) {
                    log.error("报错暂停,该后退一limit:{}", this.getOffs() + count);
                    this.setDesc("报错暂停,该后退一limit");   //报错暂停应该后退一个limit
                    this.setOffs(this.getOffs() + count);
                    //取消批量存储？ 一整批抛弃？原先最多弃单个
                    return false;
                }
            }
            count++;
            LocalDateTime now= LocalDateTime.now();
            java.time.Duration duration = java.time.Duration.between(this.getLast(), now);
            //怕时间太长，影响任务抢占了。分片运行有效时间太短了反而没机会抢到。
            if(duration.toSeconds()>60)
                break;
        }
        //最后一批一起保存。应该把ES批量保存和数据库事务拆开，CRDB事务只管数据库的作业，ES保存时间要拖出去。
        if(null!=repository1)   repository1.saveAll(rpl1);   //居然5秒就保存超时失败导致分片任务终止
        if(null!=repository2)   repository2.saveAll(rpl2);
        pagingAndSortingRepository.saveAll(pall);
        this.setOffs(this.getOffs() + count);
        return true;
    }

    /**有顺序的竟然比毫无顺序的能更快：Sort.by()最好是不会轻易被别人插入的字段，id是很随意的顺序。非null生成日期;尽量没同时进行记录新插入。
     * 排序肯定比不用排序的更慢的？id排序会更快!。   #另外分页太深的性能是慢。
     * */
    public <T,P> boolean  batchOrderSlice(ProjectionRepository pagingAndSortingRepository, Function<T, Triple<?,?,?>> sliceJob,
                                          CrudRepository repository1, CrudRepository repository2, Class<P> type){
        Pageable pageable= PageOffsetFirst.of(this.getOffs(),this.getLimt(), Sort.by("id"));
        //分片任务，保证可以重复执行，确保findAll读取出来的记录有顺序。
        log.info("run:作业{},Off={}", name, this.getOffs());
        //旧的是 Iterable<T>  pall= pagingAndSortingRepository.findAll(pageable); , StoreSync.class
        //   Class<T> classT= (Class<T>) ((ParameterizedType)this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        Slice<P> pall= (Slice<P>)pagingAndSortingRepository.readAllBy(pageable, type);
        int count=0;
        List rpl1=new ArrayList<>();
        List rpl2=new ArrayList<>();
        for (P parent:pall) {
            Triple<String, Object, Object> triple=null;
            Boolean execOK=true;
            try {
                //返回需要批量存储仓库的实体1，2：
                triple= (Triple<String, Object, Object>) sliceJob.apply((T) parent);
                Assert.isTrue(null!=triple,"必须triple");
                String result= triple.getVal1();
                //错误提示result，不代表绝对没有生成可用数据，有些仅仅算提示。
                if(StringUtils.hasText(result) )
                    ((SliceSyncRes)parent).setFail(result);
                else
                    ((SliceSyncRes)parent).setFail("OK");
                Object enty1= triple.getVal2();
                if(null!=enty1)     rpl1.add(enty1);
                Object enty2= triple.getVal3();
                if(null!=enty2)     rpl2.add(enty2);
            } catch (Exception e) {
                e.printStackTrace();
                execOK= false;
            }
            if(!execOK){
                log.error("报错暂停,该后退一limit:{}",this.getOffs()+count);
                this.setDesc("报错暂停,该后退一limit");   //报错暂停应该后退一个limit
                this.setOffs(this.getOffs() + count);
                //取消批量存储？ 一整批抛弃？原先最多弃单个
                return false;
            }
            count++;
            LocalDateTime now= LocalDateTime.now();
            java.time.Duration duration = java.time.Duration.between(this.getLast(), now);
            //怕时间太长，影响任务抢占了。分片运行有效时间太短了反而没机会抢到。
            if(duration.toSeconds()>60)
                break;
        }
        //最后一批一起保存。应该把ES批量保存和数据库事务拆开，CRDB事务只管数据库的作业，ES保存时间要拖出去。
        if(null!=repository1)   repository1.saveAll(rpl1);   //居然5秒就保存超时失败导致分片任务终止
        if(null!=repository2)   repository2.saveAll(rpl2);
        pagingAndSortingRepository.saveAll(pall);
        this.setOffs(this.getOffs() + count);
        return true;
    }

}

